Skip to content

Conversation

@kmos
Copy link
Member

@kmos kmos commented Dec 30, 2025

@kmos kmos requested review from Naros, jpechane and mfvitale January 8, 2026 10:39
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<scope>provided</scope>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really provided?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quarkus-debezium-engine-spi doesn't work without the engine or a connector that contains the connect-api but I can change the scope


import org.apache.kafka.connect.source.SourceRecord;

public interface BatchEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to operate at this level? Can't we extend CapturingEvent class to add commit or similar methods.

Suggested change
public interface BatchEvent {
public interface ComittingCapturingEvent extends CapturingEvent {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the name. In any case this class contains the record as SourceRecord and the serialized. In some case, I have noticed that inside sinks are used both (SourceRecord and serialized). I didn't used CapturingEvent because I used the concept for entire list of messages

private final AtomicInteger isCapturingFilteredEvent = new AtomicInteger(0);

@Capturing(destination = "topic.inventory.products")
public void capture(CapturingEvents<BatchEvent> events) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we take a bit different approach so the user will use somehing like

Suggested change
public void capture(CapturingEvents<BatchEvent> events) {
public void capture(List<CommittedCapturingEvent> events) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used an approach with CapturingEvents<?> because the events are flowing with some common details:

  • engine (in a multi-engine configuration for example)
  • source
  • destination

all the events should have the same engine, source and destination which means having these information repeated in all the events

private final Logger logger = LoggerFactory.getLogger(GeneralChangeConsumer.class);

@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records,
Copy link
Member

@mfvitale mfvitale Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you try to improve the readability of the method?

/***
* @return engine for which the events are emitted
*/
String engine();
Copy link
Member

@mfvitale mfvitale Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think have sense to add here a void commitBatch method instead to automatically call it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean markBatchFinished()? Yeah, I was thinking about that. All the actual debezium-server-sink use the markBatchFinished() as last statement without taking any action on it so I postponed the development of the API. In any case I'll prefer to find a different name

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see the differences:

  • auto commit the batch: In that case if there is an error processing a particular record of the batch, the batch will be commited and so the processed record will not be re-processed if a restart occurs.
  • explicit batch commit: if an error occurs the user could decide to still committ the batch, in that case we have the same behavior, but an user could decide to not commit the batch to reprocess the whole batch.

Honestly dunno if reprocessing the batch is something really used. So maybe we can leave the auto commit?

@jpechane @Naros WDYT in this regards?

@mfvitale
Copy link
Member

Overall LGTM, I just left some minor comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Manage batch processing in Debezium Extensions for Quarkus

3 participants